package org.zjvis.datascience.service;

import cn.hutool.db.Entity;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.i18n.phonenumbers.Phonenumber.PhoneNumber;
import com.google.i18n.phonenumbers.geocoding.PhoneNumberOfflineGeocoder;
import com.maxmind.geoip2.DatabaseReader;
import com.maxmind.geoip2.model.CityResponse;
import com.mayabot.nlp.fasttext.FastText;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.zjvis.datascience.common.constant.DatasetConstant;
import org.zjvis.datascience.common.constant.SemanticConstant;
import org.zjvis.datascience.common.constant.SqlTemplate;
import org.zjvis.datascience.common.enums.ActionEnum;
import org.zjvis.datascience.common.enums.DataTypeEnum;
import org.zjvis.datascience.common.enums.SemanticSubEnum;
import org.zjvis.datascience.common.sql.DataCleanSqlHelper;
import org.zjvis.datascience.common.util.DatasetUtil;
import org.zjvis.datascience.common.util.SemanticUtil;
import org.zjvis.datascience.common.util.SqlUtil;
import org.zjvis.datascience.common.util.ToolUtil;
import org.zjvis.datascience.common.util.db.JDBCUtil;
import org.zjvis.datascience.common.vo.dataset.HeadVO;
import org.zjvis.datascience.service.dataprovider.GPDataProvider;

import javax.annotation.PostConstruct;
import java.net.InetAddress;
import java.sql.Connection;
import java.sql.Statement;
import java.util.*;
import java.util.stream.Collectors;

/**
 * @description Semantic 语义 Service
 * @date 2021-12-20
 */
@Service
public class SemanticService {

    @Autowired
    private FastTextService fastTextService;

    @Autowired
    private GPDataProvider gpDataProvider;

    @Autowired
    private UrbanDataService urbanDataService;

    private final static Logger logger = LoggerFactory.getLogger(SemanticService.class);

    private final static float WEIGHT = 0.3f;
    private final static float THRES = 0.5f;
    private final static int FAIL = 20;//不小于preview data size: 20

    private Set<String> countryList;
    private Set<String> provinceList;
    private Set<String> cityList;
    private Set<String> postcodeList;
    private FastText model;
    private double[] cityVector;

    @PostConstruct
    private void init() {
        try {
            countryList = gpDataProvider.executeQueryAsOneSet(
                    String.format("select %s from dataset._country_mapper_", Joiner.on(",").join(
                            SemanticConstant.COUNTRY_COL)));
            provinceList = gpDataProvider.executeQueryAsOneSet(String
                    .format("select %s from dataset._province_mapper_",
                            Joiner.on(",").join(SemanticConstant.PROVINCE_COL)));
            cityList = gpDataProvider.executeQueryAsOneSet(String
                    .format("select %s from dataset._city_mapper_",
                            Joiner.on(",").join(SemanticConstant.CITY_COL)));
            postcodeList = gpDataProvider.executeQueryAsOneSet("select zipcode from dataset._city_mapper_");
            model = fastTextService.getAutojoinModel();
            cityVector = fastTextService.getTextVector(model, "city");
        } catch (Exception e) {
            logger.error(e.getMessage());
        }
    }

    public Set<String> getCountryList() {
        return this.countryList;
    }

    public Set<String> getProvinceList() {
        return this.provinceList;
    }

    public Set<String> getCityList() {
        return this.cityList;
    }

    /**
     * 经度匹配
     */
    final static String LON_REGEX =
            "([-+]?((0?\\d{1,2}|1[0-7]\\d)(°?|\\.\\d{1,15}°?|°\\d{1,2}\\.\\d{1,15}\'|°\\d{1,2}\'(\\d{1,2}\")?)|180°?)[EW]?)";

    /**
     * 维度匹配
     */
    final static String LAT_REGEX =
            "([-+]?([0-8]?\\d(°?|\\.\\d{1,15}°?|°\\d{1,2}\\.\\d{1,15}\'|°\\d{1,2}\'(\\d{1,2}\")?)|90°?)[NS]?)";

    /**
     * 经纬度匹配
     */
    final static String COORDINATE_REGEX = String.format("%s,\\s?%s", LON_REGEX, LAT_REGEX);

    /**
     * ipv4
     */
    final static String IPV4_REGEX =
            "((25[0-5]|2[0-4]\\d|[0-1]?\\d?\\d)(\\.(25[0-5]|2[0-4]\\d|[0-1]?\\d?\\d)){3})";

    /**
     * ipv6无全0块，标准IPv6地址的正则表达式
     */
    final static String IPV6_STD_REGEX =
            "(([0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4})";

    /**
     * ipv6压缩
     */
    final static String IPV6_COMPRESS_REGEX =
            "((([0-9A-Fa-f]{1,4}(:[0-9A-Fa-f]{1,4})*)?)::((([0-9A-Fa-f]{1,4}:)*[0-9A-Fa-f]{1,4})?))";

    /**
     * ip地址匹配
     */
    final static String IP_REGEX = String
            .format("%s|%s|%s", IPV4_REGEX, IPV6_STD_REGEX, IPV6_COMPRESS_REGEX);

    /**
     * 网址匹配
     */
    final static String URL_REGEX = "(((ht|f)tps?):\\/\\/[\\w\\-]+(\\.[\\w\\-]+)+([\\w\\-\\.,@?^=%&:\\/~\\+#]*[\\w\\-\\@?^=%&\\/~\\+#])?)";

    /**
     * 邮箱匹配
     */
    final static String EMAIL_REGEX = "[A-Za-z0-9_-\\u4e00-\\u9fa5]+@[a-zA-Z0-9_-]+(\\.[a-zA-Z0-9_-]+)+";

    /**
     * 电话匹配 手机号码 座机号码一般 前面可以加上国际区号，区域号（可以只有一个区号）
     */
    final static String TELEPHONE_REGEX = "((00|\\+)?\\d{1,3}[-\\s])?(\\d{2,4}[-\\s]?|\\(\\d{2,4}\\)[-\\s]?)?(\\d{3,4}[-\\s]?\\d{4}|1[3458]\\d[-\\s]?\\d{4}[-\\s]?\\d{4})";

    /**
     * 护照匹配 普通电子护照：E+8位数字 公务电子护照：两个字母（如SE/DE/PE）+7位数字 澳门特别行政区护照：MA+7位数字 香港特别行政区护照：K+8位数字
     */
    final static String PASSPORT_REGEX = "[EK]\\d{8}|[A-Z]{2}\\d{7}";

    Comparator<Map.Entry<String, Float>> valueComparator = new Comparator<Map.Entry<String, Float>>() {
        @Override
        public int compare(Map.Entry<String, Float> o1,
                           Map.Entry<String, Float> o2) {
            return o2.getValue().compareTo(o1.getValue());
        }
    };

    public void recommendSemantic(List<HeadVO> heads, List<Entity> data) {
        recommendSemantic(heads, data, false);
    }

    public void recommendSemantic(List<HeadVO> heads, List<Entity> data, boolean limit) {
        for (HeadVO head : heads) {
            String name = head.getName();
            Set<String> colData = new HashSet();
            for (Entity entity : data) {
                if (entity.size() == 0) {
                    continue;
                }
                String str = entity.getStr(name);
                if (str != null && !str.trim().isEmpty()) {
                    colData.add(str);
                }
            }
            checkSemantic(head, colData, limit);
        }
    }

    public void setScore(Map<String, Float> semanticScore, String k, float v, float weight) {
        semanticScore.put(k, weight * semanticScore.get(k) + (1 - weight) * v);
    }

    public void checkSemantic(HeadVO head, Set<String> data, boolean limit) {
        String name = head.getName();
        String newName = ToolUtil.standardString(name, ' ');
        double[] nameVector = fastTextService.getTextVector(model, newName);

        Map<String, Float> semanticScore = new TreeMap<String, Float>();
        for (SemanticSubEnum semantic : SemanticSubEnum.values()) {
            String enumStr = semantic.getVal();
            if (enumStr.equals(SemanticSubEnum.ORDER.getVal()) || enumStr
                    .equals(SemanticSubEnum.DISORDER.getVal()) || enumStr
                    .equals(SemanticSubEnum.NULL.getVal())) {
                semanticScore.put(enumStr, 0f);
            } else {
                double[] enumVector = fastTextService.getTextVector(model, enumStr);
                float similarity = (float) fastTextService
                        .computeVectorSimilarity(nameVector, enumVector);
                semanticScore.put(enumStr, similarity);
            }
        }
        if (data.size() > 0) {
            String type = head.getType();
            setScore(semanticScore, SemanticSubEnum.LONGITUDE.getVal(),
                    checkLongitude(type, data, new HashSet(), limit), 0.7f);
            setScore(semanticScore, SemanticSubEnum.LATITUDE.getVal(),
                    checkLatitude(type, data, new HashSet(), limit), 0.7f);
            setScore(semanticScore, SemanticSubEnum.COORDINATE.getVal(),
                    checkCoordinate(type, data, new HashSet()), WEIGHT);
            setScore(semanticScore, SemanticSubEnum.COUNTRY.getVal(),
                    checkCountry(type, data, new HashSet(), limit), WEIGHT);
            setScore(semanticScore, SemanticSubEnum.PROVINCE.getVal(),
                    checkProvince(type, data, new HashSet()), WEIGHT);
            setScore(semanticScore, SemanticSubEnum.CITY.getVal(),
                    checkCity(type, data, new HashSet()), WEIGHT);
            setScore(semanticScore, SemanticSubEnum.POSTCODE.getVal(),
                    checkPostcode(type, data, new HashSet()), WEIGHT);
            setScore(semanticScore, SemanticSubEnum.IP.getVal(),
                    checkIp(type, data, new HashSet()), WEIGHT);
            setScore(semanticScore, SemanticSubEnum.URL.getVal(),
                    checkUrl(type, data, new HashSet()), WEIGHT);
            setScore(semanticScore, SemanticSubEnum.HTTP_STATUS.getVal(),
                    checkHttpStatus(type, data, new HashSet(), limit), 0.7f);
            setScore(semanticScore, SemanticSubEnum.EMAIL.getVal(),
                    checkEmail(type, data, new HashSet()), WEIGHT);
            setScore(semanticScore, SemanticSubEnum.TELEPHONE.getVal(),
                    checkTelephone(type, data, new HashSet()), WEIGHT);
            setScore(semanticScore, SemanticSubEnum.ID.getVal(),
                    checkId(type, data, new HashSet()), WEIGHT);
            setScore(semanticScore, SemanticSubEnum.PASSPORT.getVal(),
                    checkPassport(type, data, new HashSet()), WEIGHT);
            checkCategory(type, data, semanticScore);
        }
        checkNull(semanticScore);

        List<Map.Entry<String, Float>> list = new ArrayList<Map.Entry<String, Float>>(
                semanticScore.entrySet());
        list.sort(valueComparator);
        String semantic = list.stream().map(Map.Entry::getKey).collect(Collectors.toList()).get(0);
        head.setRecommendSemantic(semantic);
    }

    public Float checkRegex(String regex, String type, Set<String> data, Set<String> fail) {
        if (type.equals(DatasetConstant.DATA_VARCHAR)) {
            return checkRegex(regex, data, fail);
        }
        return 0f;
    }

    public Float checkRegex(String regex, Set<String> data, Set<String> fail) {
        boolean flag = true;
        for (String str : data) {
            String preStr = DatasetUtil.preprocessStr(str);
            if (!preStr.matches(regex)) {
                fail.add(str);
                if (fail.size() > FAIL) {
                    return 0f;
                }
                flag = false;
            }
        }
        if (flag) {
            return 1f;
        } else {
            return 0f;
        }
    }

    public Float checkRange(Set<String> data, Set<String> fail, float left, float right) {
        boolean flag = true;
        for (String str : data) {
            float value = Float.parseFloat(str);
            if (value < left || value > right) {
                fail.add(str);
                if (fail.size() > FAIL) {
                    return 0f;
                }
                flag = false;
            }
        }
        if (flag) {
            return 1f;
        } else {
            return 0.01f;
        }
    }

    public Float checkLongitude(String type, Set<String> data, Set<String> fail, boolean limit) {
        if (type.equals(DatasetConstant.DATA_VARCHAR)) {
            return checkRegex(LON_REGEX, data, fail);
        } else if ((type.equals(DatasetConstant.DATA_INT) || type
                .equals(DatasetConstant.DATA_DECIMAL)) && !limit) {
            return checkRange(data, fail, -180, 180);
        } else {
            return 0f;
        }
    }

    public Float checkLatitude(String type, Set<String> data, Set<String> fail, boolean limit) {
        if (type.equals(DatasetConstant.DATA_VARCHAR)) {
            return checkRegex(LAT_REGEX, data, fail);
        } else if ((type.equals(DatasetConstant.DATA_INT) || type
                .equals(DatasetConstant.DATA_DECIMAL)) && !limit) {
            return checkRange(data, fail, -90, 90);
        } else {
            return 0f;
        }
    }

    public Float checkCoordinate(String type, Set<String> data, Set<String> fail) {
        return checkRegex(COORDINATE_REGEX, type, data, fail);
    }

    public Float checkCountry(String type, Set<String> data, Set<String> fail, boolean limit) {
        if ((limit && !type.equals(DatasetConstant.DATA_VARCHAR)) ||
                (!limit && !type.equals(DatasetConstant.DATA_VARCHAR) && !type.equals(DatasetConstant.DATA_INT))) {
            return 0f;
        }
        float cnt = 0f;
        for (String str : data) {
            String preStr = DatasetUtil.preprocessStr(str);
            if (preStr.startsWith("The ")) {
                preStr = preStr.substring(4);
            }
            if (countryList.contains(preStr)) {
                cnt++;
            } else {
                fail.add(str);
                if (fail.size() > FAIL) {
                    return 0f;
                }
            }
        }
        cnt /= data.size();
        return cnt;
    }

    public Float checkProvince(String type, Set<String> data, Set<String> fail) {
        if (!type.equals(DatasetConstant.DATA_VARCHAR) && !type.equals(DatasetConstant.DATA_INT)) {
            return 0f;
        }
        float cnt = 0f;
        for (String str : data) {
            String preStr = DatasetUtil.preprocessStr(str);
            if (provinceList.contains(preStr)) {
                cnt++;
            } else {
                fail.add(str);
                if (fail.size() > FAIL) {
                    return 0f;
                }
            }
        }
        return cnt / data.size();
    }

    public Float checkCity(String type, Set<String> data, Set<String> fail) {
        if (!type.equals(DatasetConstant.DATA_VARCHAR) && !type.equals(DatasetConstant.DATA_INT)) {
            return 0f;
        }
        float cnt = 0f;
        for (String str : data) {
            String preStr = DatasetUtil.preprocessStr(str);
            if (cityList.contains(preStr)) {
                cnt++;
            } else {
                double[] nameVector = fastTextService.getTextVector(model, preStr);
                float similarity = (float) fastTextService
                        .computeVectorSimilarity(nameVector, cityVector);
                if (similarity > THRES) {
                    cnt++;
                } else {
                    fail.add(str);
                    if (fail.size() > FAIL) {
                        return 0f;
                    }
                }
            }
        }
        return cnt / data.size();
    }

    public Float checkPostcode(String type, Set<String> data, Set<String> fail) {
        if (!type.equals(DatasetConstant.DATA_VARCHAR) && !type.equals(DatasetConstant.DATA_INT)) {
            return 0f;
        }
        float cnt = 0f;
        for (String str : data) {
            String preStr = DatasetUtil.preprocessStr(str);
            if (postcodeList.contains(preStr)) {
                cnt++;
            } else {
                fail.add(str);
                if (fail.size() > FAIL) {
                    return 0f;
                }
            }
        }
        return cnt / data.size();
    }

    public Float checkIp(String type, Set<String> data, Set<String> fail) {
        return checkRegex(IP_REGEX, type, data, fail);
    }

    public Float checkIpv4(String type, Set<String> data, Set<String> fail) {
        return checkRegex(IPV4_REGEX, type, data, fail);
    }

    public Float checkUrl(String type, Set<String> data, Set<String> fail) {
        return checkRegex(URL_REGEX, type, data, fail);
    }

    public Float checkHttpStatus(String type, Set<String> data, Set<String> fail, boolean limit) {
        if (type.equals(DatasetConstant.DATA_INT) && !limit) {
            return checkRange(data, fail, 100, 600);
        }
        return 0f;
    }

    public Float checkEmail(String type, Set<String> data, Set<String> fail) {
        return checkRegex(EMAIL_REGEX, type, data, fail);
    }

    public Float checkTelephone(String type, Set<String> data, Set<String> fail) {
        if (type.equals(DatasetConstant.DATA_VARCHAR) || type.equals(DatasetConstant.DATA_INT)) {
            return checkRegex(TELEPHONE_REGEX, data, fail);
        }
        return 0f;
    }

    public Float checkId(String type, Set<String> data, Set<String> fail) {
        return checkRegex(DatasetUtil.ID_REGEX, type, data, fail);
    }

    public Float checkPassport(String type, Set<String> data, Set<String> fail) {
        return checkRegex(PASSPORT_REGEX, type, data, fail);
    }

    public Float validateCategory(String type, Set<String> data) {
        if (type.equals(DatasetConstant.DATA_VARCHAR) || type.equals(DatasetConstant.DATA_INT)) {
            if (data.size() <= 12) {
                return 1f;
            }
        }
        return 0f;
    }

    public void checkCategory(String type, Set<String> data, Map<String, Float> semanticScore) {
        for (Map.Entry<String, Float> score : semanticScore.entrySet()) {
            if (score.getValue() > THRES) {
                return;
            }
        }
        if (type.equals(DatasetConstant.DATA_VARCHAR) || type.equals(DatasetConstant.DATA_INT)) {
            if (data.size() <= 12) {
                float cnt = (13f - data.size()) / 12;
                semanticScore.put(SemanticSubEnum.DISORDER.getVal(), cnt);
            }
        }
    }

    public void checkNull(Map<String, Float> semanticScore) {
        boolean flag = true;
        for (Map.Entry<String, Float> score : semanticScore.entrySet()) {
            if (score.getValue() > THRES) {
                flag = false;
            } else if (score.getValue() < 0.2) {
                semanticScore.put(score.getKey(), 0f);
            }
        }
        if (flag) {
            semanticScore.put(SemanticSubEnum.NULL.getVal(), 1f);
        } else {
            semanticScore.put(SemanticSubEnum.NULL.getVal(), -1f);
        }
    }

    public JSONObject validateTransform(JSONObject param) {
        JSONObject fails = new JSONObject();
        Set<String> fail = new LinkedHashSet();

        String col = param.getString("col");
        String semantic = param.getString("semantic");
        String toSemantic = param.getString("toSemantic");
        String type = param.getString("type");
        Set<String> values = new LinkedHashSet<>();

        JSONObject newUpdate = new JSONObject();
        String concat = "_concat_";
        if (param.getJSONObject("update") != null) {
            JSONObject update = param.getJSONObject("update");
            for (Map.Entry entry : update.entrySet()) {
                values.add((String) entry.getValue());
                String key = (String) entry.getValue();
                if (newUpdate.containsKey(key)) {
                    newUpdate.put(key, newUpdate.getString(key) + concat + entry.getKey());
                } else {
                    newUpdate.put(key, entry.getKey());
                }
            }
        } else {
            if (toSemantic.equals("null")) {
                fails.put(col, fail);
                return fails;
            }

            String tableName = param.getString("table");
            if (!tableName.contains(".")) {
                tableName = "dataset." + tableName;
            }

            String selectSql = String.format("select distinct %s from %s", col, tableName);
            values = gpDataProvider.executeQueryAsOneSet(selectSql);
        }

        float score = 0;
        switch (toSemantic) {
            case "longitude":
                switch (semantic) {
                    case "country":
                    case "province":
                    case "city":
                    case "postcode":
                    case "coordinate":
                        score = 1f;
                        break;
                    case "ip":
                        score = checkIpv4(type, values, fail);
                        break;
                    default:
                        score = checkLongitude(type, values, fail, false);
                }
                break;
            case "latitude":
                switch (semantic) {
                    case "country":
                    case "province":
                    case "city":
                    case "postcode":
                    case "coordinate":
                        score = 1f;
                        break;
                    case "ip":
                        score = checkIpv4(type, values, fail);
                        break;
                    default:
                        score = checkLatitude(type, values, fail, false);
                }
                break;
            case "coordinate":
                switch (semantic) {
                    case "country":
                    case "province":
                    case "city":
                    case "postcode":
                        score = 1f;
                        break;
                    case "ip":
                        score = checkIpv4(type, values, fail);
                        break;
                    default:
                        score = checkCoordinate(type, values, fail);
                }
                break;
            case "country":
                switch (semantic) {
                    case "telephone":
                    case "coordinate":
                        score = 1f;
                        break;
                    case "ip":
                        score = checkIpv4(type, values, fail);
                        break;
                    default:
                        score = checkCountry(type, values, fail, false);
                        if (score > 0.7) {
                            fail.clear();
                        }
                }
                break;
            case "province":
                switch (semantic) {
                    case "id":
                    case "telephone":
                    case "coordinate":
                        score = 1f;
                        break;
                    case "ip":
                        score = checkIpv4(type, values, fail);
                        break;
                    default:
                        score = checkProvince(type, values, fail);
                        if (score > 0.7) {
                            fail.clear();
                        }
                }
                break;
            case "city":
                switch (semantic) {
                    case "id":
                    case "telephone":
                    case "coordinate":
                        score = 1f;
                        break;
                    case "ip":
                        score = checkIpv4(type, values, fail);
                        break;
                    default:
                        score = checkCity(type, values, fail);
                        if (score > 0.7) {
                            fail.clear();
                        }
                }
                break;
            case "postcode":
                switch (semantic) {
                    case "ip":
                        score = checkIpv4(type, values, fail);
                        break;
                    default:
                        score = checkPostcode(type, values, fail);
                }
                break;
            case "ip":
                score = checkIp(type, values, fail);
                break;
            case "httpstatus":
                score = checkHttpStatus(type, values, fail, false);
                break;
            case "email":
                score = checkEmail(type, values, fail);
                break;
            case "telephone":
                score = checkTelephone(type, values, fail);
                break;
            case "id":
                score = checkId(type, values, fail);
                break;
            case "passport":
                score = checkPassport(type, values, fail);
                break;
            case "order":
            case "disorder":
                score = validateCategory(type, values);
                break;
        }
        if (score > 0) {
            fails.put(col, fail);
        } else {
            fails.put(col, false);
        }
        return fails;
    }

    /**
     * 修改语义类型
     *
     * @return sql
     */
    public String semanticTransform(Map<String, Integer> columnTypes, JSONObject data, Long taskId) {
        ActionEnum actionEnum = ActionEnum.valueOf(data.getString("action"));
        data.put("label", actionEnum.label());
        String sourceTable = ToolUtil.alignTableName(data.getString("table"), 0L);
        String targetTable = String.format(SqlTemplate.SCHEMA + ".view_tclean_%s_%s", taskId, System.currentTimeMillis());
        data.put("table", targetTable);
        String col = data.getString("col");
        String semantic = data.getString("semantic");
        if (semantic == null) {
            semantic = "null";
        }
        String toSemantic = data.getString("toSemantic");
        String selectSql = "";
        List<String> cols = Lists.newLinkedList(columnTypes.keySet());

        if (toSemantic.equals("null")) {
            selectSql = String.format("select * from %s", sourceTable);
        } else {
            String newCol = data.getString("columnName");
            newCol = SqlUtil.formatPGSqlColName(newCol);
            int index = cols.indexOf(col);
            col = SqlUtil.formatPGSqlColName(col);
            JSONObject ret = null;

            boolean flag = true;
            switch (semantic) {
                case "id":
                    ret = transformId(col, newCol, cols, toSemantic, sourceTable);
                    break;
                case "country":
                    ret = transformCountry(col, newCol, cols, toSemantic, sourceTable);
                    break;
                case "province":
                    ret = transformProvince(col, newCol, cols, toSemantic, sourceTable);
                    break;
                case "city":
                    ret = transformCity(col, newCol, cols, toSemantic, sourceTable);
                    break;
                case "postcode":
                    ret = transformPostcode(col, newCol, cols, toSemantic, sourceTable);
                    break;
                case "telephone":
                    ret = transformTelephone(col, newCol, cols, toSemantic, sourceTable);
                    break;
                case "ip":
                    ret = transformIp(col, newCol, cols, toSemantic, sourceTable);
                    break;
                case "coordinate":
                    ret = transformCoordinate(col, newCol, cols, toSemantic, sourceTable);
                    break;
                default:
                    flag = false;
            }

            List<String> formatCols = new ArrayList<>();
            String formatCol = "";
            String fromSql = sourceTable + "";
            if (ret != null) {
                flag = ret.getBoolean("flag");
            }

            if (flag) {
                formatCols = (List<String>) ret.get("formatCols");
                formatCol = ret.getString("formatCol");
                fromSql = ret.getString("fromSql");
            } else {
                formatCols = SqlUtil.formatPGSqlCols(cols);
                if (data.getString("order") != null) {
                    formatCol = String.format(DataCleanSqlHelper.TRANSFORM_SQL, col,
                            DataTypeEnum.VARCHAR.getValue());
                } else {
                    formatCol = col;
                }
                if (data.getJSONObject("update") != null) {
                    String updateSql = "";
                    JSONObject update = data.getJSONObject("update");
                    for (Map.Entry entry : update.entrySet()) {
                        if (entry.getValue().equals("null")) {
                            updateSql += String
                                    .format("when %s in ('%s') then null ", col, entry.getKey());
                        } else {
                            updateSql += String
                                    .format("when %s in ('%s') then '%s' ", col, entry.getKey(),
                                            entry.getValue());
                        }
                    }
                    formatCol = String
                            .format("case %s else %s end as %s", updateSql, col, newCol);
                } else {
                    formatCol = String.format("%s as %s", formatCol, newCol);
                }
            }

            formatCols.add(index + 1, formatCol);
            selectSql = String.format("select %s from %s", Joiner.on(",").join(formatCols.iterator()), fromSql);
        }
        return String.format(SqlTemplate.CREATE_VIEW_SQL, targetTable, selectSql);
    }

    private JSONObject transformId(String col, String newCol, List<String> cols, String toSemantic, String sourceTable) {
        JSONObject ret = new JSONObject();
        List<String> sqlItems = new ArrayList<>();
        List<String> formatCols = new ArrayList<>();
        String formatCol = "";
        String fromSql = "";

        switch (toSemantic) {
            case "province":
                sqlItems.add(String.format("a.id = substring(cast(b.%s as varchar), 1, 4)", col));
                for (String c : cols) {
                    formatCols.add(String.format("b.\"%s\" as \"%s\"", c, c));
                }
                fromSql = String
                        .format("dataset._city_mapper_ a join %s b on %s;",
                                sourceTable,
                                Joiner.on(" or ").join(sqlItems));
                formatCol = String.format("a.province as %s", newCol);
                break;
            case "city":
                sqlItems.add(String.format("a.id = substring(cast(b.%s as varchar), 1, 4)", col));
                for (String c : cols) {
                    formatCols.add(String.format("b.\"%s\" as \"%s\"", c, c));
                }
                fromSql = String
                        .format("dataset._city_mapper_ a join %s b on %s;",
                                sourceTable,
                                Joiner.on(" or ").join(sqlItems));
                formatCol = String.format("a.zh_full as %s", newCol);
                break;
            case "disorder":
                formatCols = SqlUtil.formatPGSqlCols(cols);
                String mod = String.format("mod(cast(substring(cast(%s as varchar), 17, 1) as int), 2)", col);
                formatCol = String.format("case when %s in (1) then '男' when %s in (0) then '女' end as %s ",
                        mod, mod, newCol);
                fromSql = sourceTable + "";
                break;
            default:
                ret.put("flag", false);
                return ret;
        }

        ret.put("formatCol", formatCol);
        ret.put("formatCols", formatCols);
        ret.put("fromSql", fromSql);
        ret.put("flag", true);
        return ret;
    }

    private JSONObject transformCountry(String col, String newCol, List<String> cols, String toSemantic, String sourceTable) {
        JSONObject ret = new JSONObject();
        List<String> formatCols = new ArrayList<>();
        String formatCol = "";

        List<String> sqlItems = SemanticUtil
                .semanticItems(col, SemanticConstant.COUNTRY_COL, "a.", "b.");
        for (String c : cols) {
            formatCols.add(String.format("b.\"%s\" as \"%s\"", c, c));
        }
        String fromSql = String
                .format("dataset._country_mapper_ a join %s b on %s;",
                        sourceTable,
                        Joiner.on(" or ").join(sqlItems));
        switch (toSemantic) {
            case "longitude":
                formatCol = String.format("a.lon as %s", newCol);
                break;
            case "latitude":
                formatCol = String.format("a.lat as %s", newCol);
                break;
            case "coordinate":
                formatCol = String.format("(a.lon || ',' || a.lat) as %s", newCol);
                break;
            default:
                ret.put("flag", false);
                return ret;
        }

        ret.put("formatCol", formatCol);
        ret.put("formatCols", formatCols);
        ret.put("fromSql", fromSql);
        ret.put("flag", true);
        return ret;
    }

    private JSONObject transformProvince(String col, String newCol, List<String> cols, String toSemantic, String sourceTable) {
        JSONObject ret = new JSONObject();
        List<String> formatCols = new ArrayList<>();
        String formatCol = "";

        List<String> sqlItems = SemanticUtil
                .semanticItems(col, SemanticConstant.PROVINCE_COL, "a.", "b.");
        for (String c : cols) {
            formatCols.add(String.format("b.\"%s\" as \"%s\"", c, c));
        }
        String fromSql = String
                .format("dataset._province_mapper_ a join %s b on %s;",
                        sourceTable,
                        Joiner.on(" or ").join(sqlItems));
        switch (toSemantic) {
            case "longitude":
                formatCol = String.format("a.lon as %s", newCol);
                break;
            case "latitude":
                formatCol = String.format("a.lat as %s", newCol);
                break;
            case "coordinate":
                formatCol = String.format("(a.lon || ',' || a.lat) as %s", newCol);
                break;
            default:
                ret.put("flag", false);
                return ret;
        }

        ret.put("formatCol", formatCol);
        ret.put("formatCols", formatCols);
        ret.put("fromSql", fromSql);
        ret.put("flag", true);
        return ret;
    }

    private JSONObject transformCity(String col, String newCol, List<String> cols, String toSemantic, String sourceTable) {
        JSONObject ret = new JSONObject();
        List<String> formatCols = new ArrayList<>();
        String formatCol = "";

        List<String> sqlItems = SemanticUtil
                .semanticItems(col, SemanticConstant.CITY_COL, "a.", "b.");
        for (String c : cols) {
            formatCols.add(String.format("b.\"%s\" as \"%s\"", c, c));
        }
        String fromSql = String
                .format("dataset._city_mapper_ a join %s b on %s;",
                        sourceTable,
                        Joiner.on(" or ").join(sqlItems));
        switch (toSemantic) {
            case "longitude":
                formatCol = String.format("a.lon as %s", newCol);
                break;
            case "latitude":
                formatCol = String.format("a.lat as %s", newCol);
                break;
            case "coordinate":
                formatCol = String.format("(a.lon || ',' || a.lat) as %s", newCol);
                break;
            default:
                ret.put("flag", false);
                return ret;
        }

        ret.put("formatCol", formatCol);
        ret.put("formatCols", formatCols);
        ret.put("fromSql", fromSql);
        ret.put("flag", true);
        return ret;
    }

    private JSONObject transformPostcode(String col, String newCol, List<String> cols, String toSemantic, String sourceTable) {
        JSONObject ret = new JSONObject();
        List<String> formatCols = new ArrayList<>();
        String formatCol = "";
        String sqlItem = String.format("a.zipcode = cast(b.%s as varchar)", col);

        for (String c : cols) {
            formatCols.add(String.format("b.\"%s\" as \"%s\"", c, c));
        }
        String fromSql = String
                .format("dataset._city_mapper_ a join %s b on %s;",
                        sourceTable, sqlItem);
        switch (toSemantic) {
            case "longitude":
                formatCol = String.format("a.lon as %s", newCol);
                break;
            case "latitude":
                formatCol = String.format("a.lat as %s", newCol);
                break;
            case "coordinate":
                formatCol = String.format("(a.lon || ',' || a.lat) as %s", newCol);
                break;
            default:
                ret.put("flag", false);
                return ret;
        }

        ret.put("formatCol", formatCol);
        ret.put("formatCols", formatCols);
        ret.put("fromSql", fromSql);
        ret.put("flag", true);
        return ret;
    }

    private JSONObject transformTelephone(String col, String newCol, List<String> cols, String toSemantic, String sourceTable) {
        JSONObject ret = new JSONObject();
        switch (toSemantic) {
            case "country":
            case "province":
            case "city":
                break;
            default:
                ret.put("flag", false);
                return ret;
        }

        PhoneNumberOfflineGeocoder geocoder = PhoneNumberOfflineGeocoder
                .getInstance();
        PhoneNumber pn = new PhoneNumber();
        JSONObject update = new JSONObject();

        Set<String> phoneNumbers = gpDataProvider.executeQueryAsOneSet(
                String.format("select %s from %s", col, sourceTable));
        for (String phoneNumber : phoneNumbers) {
            try {
                int countryCode = 86;//中国
                long phone;
                if (phoneNumber.contains("-") || phoneNumber.contains("\\s")) {
                    String phoneSplit[] = phoneNumber.split("-|\\s|\\(|\\)");
                    countryCode = Integer.parseInt(phoneSplit[0]);
                    String phoneStr = "";
                    for (int i = 1; i < phoneSplit.length; i++) {
                        phoneStr += phoneSplit[i];
                    }
                    phone = Long.parseLong(phoneStr);
                } else {
                    phone = Long.parseLong(phoneNumber);
                }
                pn.setCountryCode(countryCode);
                pn.setNationalNumber(phone);
                String location = geocoder
                        .getDescriptionForNumber(pn, Locale.CHINESE);

                if (countryCode == 86) {
                    if (toSemantic.equals("country")) {
                        update.put(phoneNumber, "中国");
                    } else {
                        String province = "";
                        String city = "";
                        if (location.length() == 3) {
                            province = city = location;
                        } else if (location.contains("省")) {
                            province = location.split("省")[0];
                            city = location.split("省")[1];
                        } else if (location.startsWith("内蒙古")) {
                            province = "内蒙古";
                            city = location.substring(3);
                        } else {
                            province = location.substring(0, 2);
                            city = location.substring(2);
                        }
                        if (toSemantic.equals("province")) {
                            update.put(phoneNumber, province);
                        } else if (toSemantic.equals("city")) {
                            update.put(phoneNumber, city);
                        }
                    }
                } else {
                    if (toSemantic.equals("country")) {
                        update.put(phoneNumber, location);
                    } else {
                        update.put(phoneNumber, "null");
                    }
                }
            } catch (Exception e) {
                update.put(phoneNumber, "null");
            }
        }

        String updateSql = "";
        for (Map.Entry entry : update.entrySet()) {
            if (entry.getValue().equals("null")) {
                updateSql += String
                        .format("when %s in ('%s') then null ", col,
                                entry.getKey());
            } else {
                updateSql += String
                        .format("when %s in ('%s') then '%s' ", col, entry.getKey(),
                                entry.getValue());
            }
        }
        String formatCol = String
                .format("case %s end as %s", updateSql, newCol);
        List<String> formatCols = SqlUtil.formatPGSqlCols(cols);
        String fromSql = sourceTable + "";

        ret.put("formatCol", formatCol);
        ret.put("formatCols", formatCols);
        ret.put("fromSql", fromSql);
        ret.put("flag", true);
        return ret;
    }

    private JSONObject transformIp(String col, String newCol, List<String> cols, String toSemantic, String sourceTable) {
        JSONObject ret = new JSONObject();
        switch (toSemantic) {
            case "longitude":
            case "latitude":
            case "coordinate":
            case "country":
            case "province":
            case "city":
            case "postcode":
                break;
            default:
                ret.put("flag", false);
                return ret;
        }

        DatabaseReader ipDatabase = urbanDataService.getIpDatabase();
        JSONObject update = new JSONObject();
        Connection conn = null;
        try {
            conn = gpDataProvider.getConn(1L);
            Statement st = conn.createStatement();
            Set<String> ips = gpDataProvider.executeQueryAsOneSet(st, String.format("select %s from %s", col, sourceTable));
            for (String ip : ips) {
                try {
                    InetAddress ipAddress = InetAddress.getByName(ip);
                    CityResponse response = ipDatabase.city(ipAddress);
                    switch (toSemantic) {
                        case "longitude":
                            update.put(ip,
                                    String.valueOf(response.getLocation().getLongitude()));
                            break;
                        case "latitude":
                            update.put(ip,
                                    String.valueOf(response.getLocation().getLatitude()));
                            break;
                        case "coordinate":
                            String lon = String
                                    .valueOf(response.getLocation().getLongitude());
                            String lat = String
                                    .valueOf(response.getLocation().getLatitude());
                            update.put(ip, String.format("%s,%s", lon, lat));
                            break;
                        case "country":
                            update.put(ip, response.getCountry().getNames().get("zh-CN"));
                            break;
                        case "province":
                            update.put(ip, response.getSubdivisions().get(0).getNames().get("zh-CN"));
                            break;
                        case "city":
                            update.put(ip, response.getCity().getNames().get("zh-CN"));
                            break;
                        case "postcode":
                            String zh = response.getCity().getNames().get("zh-CN");
                            String sql = String.format(
                                    "select zipcode from dataset._city_mapper_ where zh_full='%s'",
                                    zh);
                            String zipcode = gpDataProvider.executeQuery(st, sql)
                                    .getJSONObject(0).getString("zipcode");
                            update.put(ip, zipcode);
                            break;
                    }
                } catch (Exception e) {
                    update.put(ip, "null");
                }
            }
        } catch (Exception e) {
            logger.error(e.getMessage());
        } finally {
            JDBCUtil.close(conn, null, null);
        }

        String updateSql = "";
        for (Map.Entry entry : update.entrySet()) {
            if (entry.getValue().equals("null")) {
                updateSql += String
                        .format("when %s in ('%s') then null ", col, entry.getKey());
            } else {
                updateSql += String
                        .format("when %s in ('%s') then '%s' ", col, entry.getKey(),
                                entry.getValue());
            }
        }
        String formatCol = String
                .format("case %s end as %s", updateSql, newCol);
        List<String> formatCols = SqlUtil.formatPGSqlCols(cols);
        String fromSql = sourceTable + "";

        ret.put("formatCol", formatCol);
        ret.put("formatCols", formatCols);
        ret.put("fromSql", fromSql);
        ret.put("flag", true);
        return ret;
    }

    private JSONObject transformCoordinate(String col, String newCol, List<String> cols, String toSemantic, String sourceTable) {
        JSONObject ret = new JSONObject();
        List<String> formatCols = SqlUtil.formatPGSqlCols(cols);
        String formatCol = "";
        String fromSql = sourceTable + "";

        switch (toSemantic) {
            case "longitude":
                formatCol = String.format("split_part(%s, ',', 1) as %s", col, newCol);
                break;
            case "latitude":
                formatCol = String.format("split_part(%s, ',', 2) as %s", col, newCol);
                break;
            case "country":
                formatCol = String.format("pipeline.sys_func_reverse_geocode(%s, 'country') as %s", col, newCol);
                break;
            case "province":
                formatCol = String.format("pipeline.sys_func_reverse_geocode(%s, 'province') as %s", col, newCol);
                break;
            case "city":
                formatCol = String.format("pipeline.sys_func_reverse_geocode(%s, 'city') as %s", col, newCol);
                break;
            default:
                ret.put("flag", false);
                return ret;
        }

        ret.put("formatCol", formatCol);
        ret.put("formatCols", formatCols);
        ret.put("fromSql", fromSql);
        ret.put("flag", true);
        return ret;
    }
}
